    
/**   
 * 文件名：com.xingfu999.ssm.ConcurrentHashMapDemo.java   
 */   
    
package cn.tx.common.concurrent;

    
 /**   
 * 此类描述的是:
 * @author: cuixinfu@timidata.com
 * @version: 1.0 
 * @date:2017年8月22日 下午3:38:23  
 * Copyright xingfu999 Corporation 2017    
 */

import java.util.Date;  
import java.util.concurrent.ConcurrentLinkedQueue;  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.Executors;  
  
  
/**  
* 并发集合 java.util.concurrent.ConcurrentHashMap 演示 
* 并发 10 个生产者产生数据，并发  5  个消费消费数据；调整活动线程数将减小处理时间 
* @author xxj   
*/  
public class ConcurrentHashMapDemo {  
    /** 
     * 生产者快，消费者慢 
     * @param args 
     * @throws InterruptedException 
     *  @author xxj  
     */  
    public static void main(String[] args) throws InterruptedException {  
        Date before=new Date();  
        int pTotalThread =100; //最大线程数（生产者）  
        int pActivities=10; //最大线程数（生产者）  
        int cTotalThread =50; //活动线程数（消费者）  
        int cActivities=5; //活动线程数（消费者）  
        _lCountDownLatch = new CountDownLatch(pTotalThread+cTotalThread);  
          
        initKeys(pTotalThread);  
        startProducer(pActivities,pTotalThread);  
        startConsumer(cActivities,cTotalThread);  
          
        _lCountDownLatch.await();//等待所有线程完成  
        Date after = new Date();  
  
  
        System.out.println("队列为空："+_concurrentHashMap.isEmpty());  
        System.out.println("耗时："+((after.getTime()-before.getTime())/1000));  
        System.out.println("同步队列："+_lCountDownLatch.getCount());          
    }  
    private static CountDownLatch _lCountDownLatch;
    private static java.util.concurrent.ConcurrentHashMap<Integer,Integer> _concurrentHashMap =   
            new java.util.concurrent.ConcurrentHashMap<Integer,Integer>();  
    private static ConcurrentLinkedQueue<Integer> _keys =
            new ConcurrentLinkedQueue<Integer>();  
              
    private static void initKeys(int size){  
        for(int i=0;i<size;i++){  
            _keys.add(i);  
        }  
    }  
    private static void startProducer(int active,int totalThread) throws InterruptedException{  
        java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);  
        int size =1024*1024*10;//产生 3 M 数据  
        Thread thread ;  
        for(int i=0;i<totalThread;i++){  
            thread = new Thread(new producer(i,size));  
            pool.execute(thread);  
        }  
    }  
    private static void startConsumer(int active,int totalThread) throws InterruptedException{  
        java.util.concurrent.ExecutorService pool = Executors.newFixedThreadPool(active);         
        Thread thread ;  
        //启动x个消费者  
        for(int i=0;i<totalThread;i++){  
            thread = new Thread(new consumer());  
            pool.execute(thread);  
        }  
    }  
    /** 
     * 生产者 
     * @author xxj  
     * 
     */  
    private static class producer implements Runnable{  
        public producer(int key,int size){  
            _size=size;  
            _key=key;  
        }  
        int _key;  
        int _size;  
        public void run() {  
            ConcurrentHashMapDemo._concurrentHashMap.put(_key,_key);//生产  
            System.out.println("已创建："+_key);  
            ConcurrentHashMapDemo._lCountDownLatch.countDown();//线程同步递减  
            try {  
                Thread.sleep(500);  
            } catch (InterruptedException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }  
    }  
    /** 
     * 消费者 
     * @author xxj  
     * 
     */  
    private static class consumer implements Runnable{  
        public consumer(){  
        }  
        public void run() {  
            Integer key = 0;  
            Integer nInteger=null;  
            //循环消费，直到队列内容为空  
            while(!ConcurrentHashMapDemo._concurrentHashMap.isEmpty() &&  
                    !ConcurrentHashMapDemo._keys.isEmpty()){  
                key =  ConcurrentHashMapDemo._keys.poll();//获取 key  
                if(ConcurrentHashMapDemo._concurrentHashMap.containsKey(key)){  
                    nInteger = ConcurrentHashMapDemo._concurrentHashMap.get(key);//消费  
                }  
                System.err.println("消费："+nInteger);  
                try {  
                    Thread.sleep(500);//每次消费等一会儿  
                } catch (InterruptedException e) {  
                    // TODO Auto-generated catch block  
                    e.printStackTrace();  
                }  
            }  
            ConcurrentHashMapDemo._lCountDownLatch.countDown();//线程同步递减  
        }  
    }  
}